#!/usr/bin/env python3

"""Zero dependency shell spawner program
Connects JupyterLab terminal to an SSH daemon
running on an environment shell container.
"""

import json
import os
import subprocess
import sys
import threading
import time
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
from urllib import request

from _orchest.internals import config as _config

# Includes "http://" and "/api"
ORCHEST_API_ADDRESS = os.environ["ORCHEST_API_ADDRESS"]
# Includes "http://"
ORCHEST_WEBSERVER_ADDRESS = os.environ["ORCHEST_WEBSERVER_ADDRESS"]
LOADING_CHARS = ["⠾", "⠷", "⠯", "⠟", "⠻", "⠽"]
RANDOM_UUID_SUFFIX_LEN = _config.ENVIRONMENT_SHELL_SUFFIX_UUID_LENGTH


class Action(Enum):
    CONNECT = 0
    START = 1
    RESTART = 2
    TERMINATE = 3
    EXIT = 4
    NONE = None


class BackgroundRequest(threading.Thread):
    def __init__(self, request_callable, callable_args):
        self.request_ready = False
        self.request_callable = request_callable
        self.callable_args = callable_args
        self.request_result = None

        super().__init__()

    def run(self):
        self.request_result = self.request_callable(*self.callable_args)
        self.request_ready = True


def print_build_warning():
    print("Could not connect to SSH in the environment. Try rebuilding ")
    print("your environment to update to the latest base image.")


def print_jupyter_warning():
    print(
        "\nUsing the JupyterLab terminal directly is not recommended "
        + "read more about it in the docs\n"
        + "➜ https://docs.orchest.io/en/stable/fundamentals/environments.html"
        + "\n\nOpen a new terminal window to get back to environment shells."
    )


def environment_shell_uuid_to_environment_uuid(environment_shell_uuid: str) -> str:
    return environment_shell_uuid[: -(RANDOM_UUID_SUFFIX_LEN + 1)]


def environment_shell_uuid_to_name(
    environment_shell_uuid: str, environments: List[Dict[str, Any]]
) -> str:
    # Cut of last random UUID
    environment_uuid = environment_shell_uuid_to_environment_uuid(
        environment_shell_uuid
    )

    # Find environment uuid in environments
    name = environment_shell_uuid
    for environment in environments:
        if environment["uuid"] == environment_uuid:
            name = (
                environment["name"]
                + " "
                + environment_shell_uuid[-RANDOM_UUID_SUFFIX_LEN:]
            )
            break

    return name


def collect_choice(choice_count: int, prefix: str):
    while True:
        try:
            choice_end = ("-" + str(choice_count - 1)) if choice_count > 1 else ""
            print(f"{prefix} [0{choice_end}]: ", end="", flush=True)
            user_input = input()
            index = int(user_input)
            if not (index >= 0 and index <= choice_count - 1):
                raise ValueError()
            return index
        except KeyboardInterrupt:
            print()
            return None
        except Exception:
            # Choose first option if only one option,
            # still run rest of the logic
            # to enable backing out using Ctrl+C KeyboardInterrupt
            if choice_count == 1:
                return 0

            print(
                f"Enter integer between 0 and {choice_count - 1}"
                " or type Ctrl + C to cancel."
            )
            continue


def get_environments() -> List[Dict[str, Any]]:
    # NOTE: Querying the orchest-webserver here instead of the
    # orchest-api is okay because Environment shells are only started
    # as part of an interactive session, which in turn is gated for
    # Environments to be present. In addition, the name of the
    # Environments is needed, which doesn't exist in the orchest-api.
    req = request.Request(
        ORCHEST_WEBSERVER_ADDRESS
        + "/store/environments/"
        + os.environ["ORCHEST_PROJECT_UUID"]
    )
    req.add_header("Content-Type", "application/json")

    resp = request.urlopen(req)
    content = resp.read()
    json_resp = json.loads(content)

    return json_resp


def get_environment_shells() -> List[Dict[str, Any]]:
    req = request.Request(
        ORCHEST_API_ADDRESS
        + "/environment-shells/?session_uuid="
        + os.environ["ORCHEST_SESSION_UUID"]
    )
    req.add_header("Content-Type", "application/json")

    resp = request.urlopen(req)
    content = resp.read()
    json_resp = json.loads(content)

    return json_resp["environment_shells"]


def get_project(project_uuid: str) -> Dict[str, Any]:
    req = request.Request(ORCHEST_WEBSERVER_ADDRESS + "/async/projects/" + project_uuid)
    req.add_header("Content-Type", "application/json")

    resp = request.urlopen(req)
    content = resp.read()
    json_resp = json.loads(content)

    return json_resp


def stop_environment_shell(
    environment_shell_uuid: str, environments: List[Dict[str, Any]]
):
    req = request.Request(
        ORCHEST_API_ADDRESS + "/environment-shells/" + environment_shell_uuid,
        method="DELETE",
    )
    req.add_header("Content-Type", "application/json")

    resp = request.urlopen(req)

    if resp.status == 200:
        print(
            "Successfully terminated "
            + environment_shell_uuid_to_name(environment_shell_uuid, environments)
        )
    else:
        print(
            "Failed to terminated "
            + environment_shell_uuid_to_name(environment_shell_uuid, environments)
            + " received status code "
            + resp.status
        )


def show_loader(
    check_ready_callable: Callable,
    callable_args: List[Any],
    prompt: str,
    timeout: float = 30,
):
    waited = 0
    wait_index = 0
    speed = 0.25
    while not check_ready_callable(*callable_args) and waited < timeout:
        if wait_index > 0:
            print("\r", end="", flush=True)

        print(
            prompt + " " + LOADING_CHARS[wait_index % (len(LOADING_CHARS) - 1)] + " ",
            end="",
            flush=True,
        )
        time.sleep(speed)
        waited += speed
        wait_index += 1

    print("")
    if waited >= timeout:
        print("Warning! Loader timed out after %d seconds." % timeout)


def _spawn_environment_shell(environment: Dict[str, Any]) -> Dict[str, Any]:
    # Get project_dir from webserver
    project = get_project(os.environ["ORCHEST_PROJECT_UUID"])

    data = json.dumps(
        {
            "project_uuid": os.environ["ORCHEST_PROJECT_UUID"],
            "pipeline_path": os.environ["ORCHEST_SESSION_PIPELINE_PATH"],
            "pipeline_uuid": os.environ["ORCHEST_PIPELINE_UUID"],
            "userdir_pvc": "userdir-pvc",
            "project_dir": "/userdir/projects/" + project["path"],
            "environment_uuid": environment["uuid"],
        }
    ).encode("utf-8")

    req = request.Request(
        ORCHEST_API_ADDRESS + "/environment-shells/", data=data, method="POST"
    )

    req.add_header("Content-Type", "application/json")

    resp = request.urlopen(req)
    content = resp.read()
    json_resp = json.loads(content)

    return json_resp


def spawn_environment_shell(environment: Dict[str, Any]) -> Dict[str, Any]:
    t = BackgroundRequest(lambda x: _spawn_environment_shell(x), [environment])
    t.start()
    show_loader(lambda x: x.request_ready, [t], "Starting shell")
    return t.request_result


def connect(environment_shell: str, environments: List[Dict[str, Any]]):
    print("Connecting...", flush=True)

    user = "jovyan"  # TODO: support more usernames
    hostname = environment_shell["hostname"]

    # Why is this reasonable from a security perspective?
    # 1) the container running the SSH daemon is only
    # cluster-network accessible
    # 2) the user already has passwordless root access to the container
    # through a Jupyter kernel or Python console session
    # 3) the user is authenticated at a network proxy level
    # when they're able to interact with the cluster.
    environment_shell_name = environment_shell_uuid_to_name(
        environment_shell["uuid"], environments
    )

    command = (
        "sshpass -p jovyan ssh "
        + "-o LogLevel=quiet "
        + "-o ForwardAgent=yes "
        + "-o StrictHostKeyChecking=no -o ConnectTimeout=5 "
        + f"-o UserKnownHostsFile=/dev/null {user}@{hostname} -t "
        + '"cd /project-dir; '
        + f'ENVIRONMENT_SHELL_NAME=\\"{environment_shell_name}\\" sudo -E su"'
    )

    co = subprocess.run(command, shell=True)
    if co.returncode == 255:
        print_build_warning()

    # Enter init loop
    init()


def choose_environment(environments: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:

    print("Available environments")
    for key, value in enumerate(environments):
        print(f"[{key}] {value['name']}")

    environment_choice = collect_choice(len(environments), "Choose environment")
    if environment_choice is not None:
        return environments[environment_choice]


def choose_action() -> Action:
    print("Choose an option")
    print("[0] Connect to an existing shell ")
    print("[1] Start a new shell ")
    print("[2] Restart an existing shell ")
    print("[3] Terminate an existing shell ")
    print("[4] Exit to JupyterLab built-in shell ")

    return Action(collect_choice(5, "Choose option"))


def choose_existing_environment_shell(
    environment_shells: List[Dict[str, Any]],
    environments: List[Dict[str, Any]],
    prompt_suffix: str,
) -> Optional[Dict[str, Any]]:
    print("Running environment shells")
    for key, environment_shell in enumerate(environment_shells):
        environment_display_name = environment_shell_uuid_to_name(
            environment_shell["uuid"], environments
        )
        print(f"[{key}] {environment_display_name}")

    environment_shell_index = collect_choice(
        len(environment_shells), "Choose environment shell " + prompt_suffix
    )

    if environment_shell_index is not None:
        return environment_shells[environment_shell_index]


def quit():
    print_jupyter_warning()
    sys.exit(0)


def init(autoconnect=False):
    environments = get_environments()
    environment_shells = get_environment_shells()

    auto_chosen = False
    action = Action.EXIT

    if len(environment_shells) == 0:
        action = Action.START
        auto_chosen = True
    elif len(environment_shells) == 1 and autoconnect:
        # Autoconnect to shell if there's only one
        connect(environment_shells[0], environments)
    elif len(environment_shells) > 1 and autoconnect:
        action = Action.CONNECT
    else:
        action = choose_action()

    if action is Action.NONE:
        quit()

    if action == Action.START:
        environment = choose_environment(environments)
        if environment is not None:
            environment_shell = spawn_environment_shell(environment)
            if environment_shell is None:
                print_build_warning()
                init()
            else:
                connect(environment_shell, environments)
        elif not auto_chosen:
            # auto_chosen is necessary to allow
            # users to back out of the
            # choose_environment menu
            # when the default action of action==0 is
            # chosen
            init()
        else:
            quit()
    elif action == Action.CONNECT:

        environment_shell = choose_existing_environment_shell(
            environment_shells, environments, "to connect to"
        )
        if environment_shell is not None:
            connect(environment_shell, environments)
        else:
            init()
    elif action == Action.RESTART:
        environment_shell = choose_existing_environment_shell(
            environment_shells, environments, "to restart"
        )

        if environment_shell is not None:
            stop_environment_shell(environment_shell["uuid"], environments)
            environment_shell = spawn_environment_shell(
                {
                    "uuid": environment_shell_uuid_to_environment_uuid(
                        environment_shell["uuid"]
                    )
                }
            )

            if environment_shell is None:
                print_build_warning()
                init()
            else:
                connect(environment_shell, environments)
        else:
            init()
    elif action == Action.TERMINATE:
        environment_shell = choose_existing_environment_shell(
            environment_shells, environments, "to terminate"
        )

        if environment_shell is not None:
            stop_environment_shell(environment_shell["uuid"], environments)

        init()
    elif action == Action.EXIT:
        quit()


if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--help":
        print("This terminal supports connecting with Orchest environment shells.")
        print(
            "To restart, terminate, start or connect to "
            "another shell simply exit out of "
        )
        print("this current shell by typing 'exit' or typing Ctrl + D.")
        print()
    else:
        init(autoconnect=True)
